import pyspark
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
import pyspark.sql.types as T
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.ticker as tck
import statistics as stats
%matplotlib inline
spark = SparkSession.builder.appName("WeatherData_Analysis").getOrCreate()
spark
#define schema
schema = T.StructType([
T.StructField("Date",T.DateType()),
T.StructField("MinTemperature",T.DoubleType()),
T.StructField("MaxTemperature",T.DoubleType()),
T.StructField("Precipitation",T.DoubleType()),
T.StructField("RelativeHumidity",T.DoubleType()),
T.StructField("State",T.StringType()),
T.StructField("District",T.StringType()),
])
#load data
weatherData = spark.read.load("Datasets/Output/WeatherDistrictData.csv",
format="csv",schema=schema,
header=True)
#check schema
weatherData.printSchema()
#adding additional columns
# Month: for aggregating at month level
# District_State : for agregating at district level
weatherData = (weatherData
.withColumn("Month",F.concat_ws("-",F.year("Date"),
F.when(F.month("Date").between(1,9),F.concat(F.lit("0"),F.month("Date")))
.otherwise(F.month("Date"))))
.withColumn("District_State", F.concat_ws(", ","District","State"))
)
#Check Schema
weatherData.printSchema()
#persist
weatherData.persist()
weatherData.rdd.getNumPartitions()
#Check Plan
weatherData.explain()
#check number of rows
#we have data for 2000-2013 i.e. 5114 days
# and 535 districts
#We should have 2735990 rows
weatherData.count()
#check data
weatherData.show()
states = list(sorted([row["State"] for row in weatherData.select("State").distinct().collect()]))
def genBarPlot(df,state,column):
plt.figure(figsize=(30,12))
plt.bar(df.Month,df[column], align='edge', width=0.4)
plt.xlabel("Time",size=16)
plt.ylabel(column,size=16)
plt.xticks(ticks=np.arange(0,5113,3),rotation=90,size=16)
plt.yticks(size=16)
plt.xlim([0,len(df.Month)])
plt.title(state+" Monthly "+column,size=20)
plt.show()
def genDualLinePlot(df,state,column1,column2,yaxis="",color1="red",color2="blue"):
plt.figure(figsize=(30,12))
plt.plot(df.Month,df[column1],color=color1,label=column1)
plt.plot(df.Month,df[column2],color=color2,label=column2)
plt.xlabel("Time",size=16)
plt.ylabel(yaxis,size=16)
plt.xticks(ticks=np.arange(0,5113,3),rotation=90,size=16)
plt.yticks(size=16)
plt.xlim([0,len(df.Month)])
plt.title(state+" Monthly "+column1+" and "+column2,size=20)
plt.show()
def genPlots(dataframe,state=None):
if state:
df = dataframe.filter(F.col("State")==state).toPandas().sort_values("Month")
else:
df = dataframe.toPandas().sort_values("Month")
state = "India"
print("Weather Information for "+state)
print(df.describe())
genBarPlot(df,state,"TotalPrecipitation")
genBarPlot(df,state,"AverageRelativeHumidity")
genDualLinePlot(df,state,"AverageMaxTemperature","AverageMinTemperature",yaxis="Temperature")
#CReate GroupedDataFrame
monthlyDF = weatherData.groupBy("Month")
#Create Aggregations and Rename Columns
monthlyDF = (monthlyDF.agg(F.sum("Precipitation"),F.mean("MinTemperature"),F.mean("MaxTemperature"),F.mean("RelativeHumidity"))
.select("Month",
F.col("sum(Precipitation)").alias("TotalPrecipitation"),
F.col("avg(MinTemperature)").alias("AverageMinTemperature"),
F.col("avg(MaxTemperature)").alias("AverageMaxTemperature"),
F.col("avg(RelativeHumidity)").alias("AverageRelativeHumidity"))
)
#Generate Plots
genPlots(monthlyDF)
Aggregated at Month Level
#Create GroupedDataFrame
monthlyStateDF = weatherData.groupBy(F.col("State"),F.col("Month"))
Aggregations used:
#Apply Aggregations and rename columns
monthlyStateDF = (monthlyStateDF.agg(F.sum("Precipitation"),F.mean("MinTemperature"),F.mean("MaxTemperature"),F.mean("RelativeHumidity"))
.select("State","Month",
F.col("sum(Precipitation)").alias("TotalPrecipitation"),
F.col("avg(MinTemperature)").alias("AverageMinTemperature"),
F.col("avg(MaxTemperature)").alias("AverageMaxTemperature"),
F.col("avg(RelativeHumidity)").alias("AverageRelativeHumidity"))
)
#Check Schema
monthlyStateDF.printSchema()
#check Execution Plan
monthlyStateDF.persist()
monthlyStateDF.explain()
for state in states:
genPlots(monthlyStateDF,state)
Aggregated at montly level
Aggregations used:
#Create GroupedDataFrame and Aggregations
districtsDF = (weatherData.groupBy("Month","District_State")
.agg(F.sum("Precipitation"),F.min("MinTemperature"),F.max("MaxTemperature"),F.min("RelativeHumidity"),F.max("RelativeHumidity"))
.select("Month","District_State",
F.col("sum(Precipitation)").alias("TotalPrecipitation"),
F.col("min(MinTemperature)").alias("MinTemperature"),
F.col("max(MaxTemperature)").alias("MaxTemperature"),
F.col("max(RelativeHumidity)").alias("MaximumRelativeHumidity"),
F.col("min(RelativeHumidity)").alias("MinimumRelativeHumidity"))
)
districtsDF.persist()
hottestDistricts = districtsDF.select("Month","District_State","MaxTemperature")
@F.pandas_udf("Month string,District_State string,MaxTemperature double",F.PandasUDFType.GROUPED_MAP)
def get10MaxTemp(pdf):
return pdf.nlargest(10,"MaxTemperature")
hottestDistricts = hottestDistricts.groupBy("Month").apply(get10MaxTemp)
hottestDistricts
coldestDistricts = districtsDF.select("Month","District_State","MinTemperature")
@F.pandas_udf("Month string,District_State string,MinTemperature double",F.PandasUDFType.GROUPED_MAP)
def get10MinTemp(pdf):
return pdf.nsmallest(10,"MinTemperature")
coldestDistricts = coldestDistricts.groupBy("Month").apply(get10MinTemp)
coldestDistricts
wettestDistricts = districtsDF.select("Month","District_State","TotalPrecipitation")
@F.pandas_udf("Month string,District_State string,TotalPrecipitation double",F.PandasUDFType.GROUPED_MAP)
def get10MaxPreci(pdf):
return pdf.nlargest(10,"TotalPrecipitation")
wettestDistricts = wettestDistricts.groupBy("Month").apply(get10MaxPreci)
wettestDistricts
driestDistricts = districtsDF.select("Month","District_State","TotalPrecipitation")
@F.pandas_udf("Month string,District_State string,TotalPrecipitation double",F.PandasUDFType.GROUPED_MAP)
def get10MinPreci(pdf):
return pdf.nsmallest(10,"TotalPrecipitation")
driestDistricts = driestDistricts.groupBy("Month").apply(get10MinPreci)
driestDistricts
mostHumidDistricts = districtsDF.select("Month","District_State","MaximumRelativeHumidity")
@F.pandas_udf("Month string,District_State string,MaximumRelativeHumidity double",F.PandasUDFType.GROUPED_MAP)
def get10MaxHumid(pdf):
return pdf.nlargest(10,"MaximumRelativeHumidity")
mostHumidDistricts = mostHumidDistricts.groupBy("Month").apply(get10MaxHumid)
mostHumidDistricts
leastHumidDistricts = districtsDF.select("Month","District_State","MinimumRelativeHumidity")
@F.pandas_udf("Month string,District_State string,MinimumRelativeHumidity double",F.PandasUDFType.GROUPED_MAP)
def get10MinHumid(pdf):
return pdf.nsmallest(10,"MinimumRelativeHumidity")
leastHumidDistricts = leastHumidDistricts.groupBy("Month").apply(get10MinHumid)
leastHumidDistricts
Link: Tableav Viz
%%HTML
<div class='tableauPlaceholder' id='viz1601471864533' style='position: relative'><noscript><a href='#'><img alt=' ' src='https://public.tableau.com/static/images/We/WeatherAnalysis_16014109382880/IndianWeatherOvertheYears/1_rss.png' style='border: none' /></a></noscript><object class='tableauViz' style='display:none;'><param name='host_url' value='https%3A%2F%2Fpublic.tableau.com%2F' /> <param name='embed_code_version' value='3' /> <param name='site_root' value='' /><param name='name' value='WeatherAnalysis_16014109382880/IndianWeatherOvertheYears' /><param name='tabs' value='yes' /><param name='toolbar' value='yes' /><param name='static_image' value='https://public.tableau.com/static/images/We/WeatherAnalysis_16014109382880/IndianWeatherOvertheYears/1.png' /> <param name='animate_transition' value='yes' /><param name='display_static_image' value='yes' /><param name='display_spinner' value='yes' /><param name='display_overlay' value='yes' /><param name='display_count' value='yes' /><param name='language' value='en' /><param name='filter' value='publish=yes' /></object></div> <script type='text/javascript'> var divElement = document.getElementById('viz1601471864533'); var vizElement = divElement.getElementsByTagName('object')[0]; if ( divElement.offsetWidth > 800 ) { vizElement.style.width='900px';vizElement.style.height='650px';} else if ( divElement.offsetWidth > 500 ) { vizElement.style.width='900px';vizElement.style.height='650px';} else { vizElement.style.minWidth='900px';vizElement.style.maxWidth='100%';vizElement.style.height='1050px';} var scriptElement = document.createElement('script'); scriptElement.src = 'https://public.tableau.com/javascripts/api/viz_v1.js'; vizElement.parentNode.insertBefore(scriptElement, vizElement); </script>